home *** CD-ROM | disk | FTP | other *** search
/ Total Network Tools 2002 / NextStepPublishing-TotalNetworkTools2002-Win95.iso / Archive / Misc Servers / Zope.exe / GFSERVE.PY < prev    next >
Encoding:
Python Source  |  2000-09-07  |  19.6 KB  |  530 lines

  1. """gadfly server mode
  2.  
  3.    script usage 
  4.    
  5.     python gfserve.py port database directory password [startup]
  6.  
  7.    test example
  8.  
  9.     python gfserve.py 2222 test dbtest admin gfstest
  10.  
  11.    port is the port to listen to
  12.    database is the database to start up. (must exist!)
  13.    directory is the directory the database is in.
  14.    password is the administrative access password.
  15.  
  16.    startup if present should be the name of a module to use
  17.    for startup.  The Startup module must contain a function
  18.    
  19.     Dict = startup(admin_policy, connection, Server_instance)
  20.        
  21.    which performs any startup actions on the database needed
  22.    and returns either None or a Dictionary of
  23.    
  24.        name > policy objects
  25.        
  26.    where the policy objects describe policies beyond the
  27.    admin policy.  The startup function may also
  28.    modify the admin_policy (disabling queries for example).
  29.  
  30.    The arguments passed to startup are:
  31.        admin_policy: the administrative policy
  32.           eg you could turn queries off for admin, using admin
  33.           only for server maintenance, or you could add prepared
  34.           queries to the admin_policy.
  35.        connection: the database connection
  36.           eg you could perform some inserts before server start
  37.           also needed to make policies.
  38.        Server_instance
  39.           Included for additional customization.
  40.  
  41.    Create policies using
  42.        P = gfserve.Policy(name, password, connection, queries=0)
  43.          -- for a "secure" policy with only prepared queries allowed,
  44.    or
  45.        P = gfserve.Policy(name, password, connection, queries=1)
  46.          -- for a policy with full access arbitrary statement
  47.             execution.
  48.  
  49.    add a "named prepared statement" to a policy using
  50.        P[name] = statement
  51.    for example
  52.        P["updatenorm"] = '''
  53.           update frequents
  54.           set bar=?, perweek=?
  55.           where drinker='norm'
  56.         '''
  57.    in this case 'updatenorm' requires 2 dynamic parameters when
  58.    invoked from a client.
  59.        
  60.    Script stdout lists server logging information.
  61.  
  62.    Some server administration services (eg shutdown)
  63.    are implemented by the script interpretion of gfclient.py.
  64. """
  65.  
  66. import socket, gadfly
  67.  
  68. from gfsocket import \
  69.    reply_exception, reply_success, Packet_Reader, certify
  70.  
  71. def main():
  72.     """start up the server."""
  73.     import sys
  74.     try:
  75.         done = 0
  76.         argv = sys.argv
  77.         nargs = len(argv)
  78.         #print nargs, argv
  79.         if nargs<5:
  80.             sys.stderr.write("gfserve: not enough arguments: %s\n\n" % argv)
  81.             sys.stderr.write(__doc__)
  82.             return
  83.         [port, db, dr, pw] = argv[1:5]
  84.         print "gfserve startup port=%s db=%s, dr=%s password omitted" % (
  85.            port, db, dr)
  86.         from string import atoi
  87.         port = atoi(port)
  88.         startup = None
  89.         if nargs>5:
  90.             startup = argv[5]
  91.             print "gfserve: load startup module %s" % startup
  92.         S = Server(port, db, dr, pw, startup)
  93.         S.init()
  94.         print "gfserve: server initialized, setting stderr=stdout"
  95.         sys.stderr = sys.stdout
  96.         print "gfserve: starting the server"
  97.         S.start()
  98.         done = 1
  99.     finally:
  100.         if not done:
  101.             print __doc__
  102.  
  103. # general error
  104. ServerError = "ServerError"
  105.  
  106. # no such prepared name
  107. PreparedNameError = "PreparedNameError"
  108.  
  109. # actions
  110.  
  111. # shut down the server (admin policy only)
  112. #   arguments = ()
  113. #   shutdown the server with no checkpoint
  114. SHUTDOWN = "SHUTDOWN"
  115.  
  116. # restart the server (admin only)
  117. #   arguments = ()
  118. #   restart the server (recover)
  119. #   no checkpoint
  120. RESTART = "RESTART"
  121.  
  122. # checkpoint the server (admin only)
  123. #   arguments = ()
  124. #   checkpoint the server
  125. CHECKPOINT = "CHECKPOINT"
  126.  
  127. # exec prepared statement
  128. #   arguments = (prepared_name_string, dyn=None)
  129. #   execute the prepared statement with dynamic args.
  130. #   autocommit.
  131. EXECUTE_PREPARED = "EXECUTE_PREPARED"
  132.  
  133. # exec any statement (only if not disabled)
  134. #   arguments = (statement_string, dyn=None)
  135. #   execute the statement with dynamic args.
  136. #   autocommit.
  137. EXECUTE_STATEMENT = "EXECUTE_STATEMENT"
  138.  
  139. ACTIONS = [SHUTDOWN, RESTART, CHECKPOINT, 
  140.            EXECUTE_PREPARED, EXECUTE_STATEMENT]
  141.            
  142. class Server:
  143.     """database server: listen for commands"""
  144.     
  145.     verbose = 1
  146.     
  147.     # wait X minutes on each server loop
  148.     select_timeout = 60*5
  149.     
  150.     # do a checkpoint each X times thru server loop
  151.     check_loop = 5
  152.  
  153.     # for now works like finger/http
  154.     #   == each command is a separate connection.
  155.     # all sql commands constitute separate transactions
  156.     #   which are automatically committed upon success.
  157.     # for now commands come in as
  158.     #  1 length (marshalled int)
  159.     #  2 (password, data) (marshalled tuple)
  160.     # responses come back as
  161.     #  1 length (marshalled int)
  162.     #  2 results (marshalled value)
  163.  
  164.     def __init__(self, port, db, dr, pw, startup=None):
  165.         self.port = port
  166.         self.db = db
  167.         self.dr = dr
  168.         self.pw = pw
  169.         self.startup = startup
  170.         self.connection = None
  171.         self.socket = None
  172.         # prepared cursors dictionary.
  173.         self.cursors = {}
  174.         self.policies = {}
  175.         self.admin_policy = None
  176.  
  177.     def start(self):
  178.         """after init, listen for commands."""
  179.         from gfsocket import READY, ERROR, unpack_certified_data
  180.         import sys
  181.         verbose = self.verbose
  182.         socket = self.socket
  183.         connection = self.connection
  184.         policies = self.policies
  185.         admin_policy = self.admin_policy
  186.         from select import select
  187.         pending_connects = {}
  188.         while 1:
  189.             try:
  190.                 # main loop
  191.                 if self.check_loop<0: self.check_loop=5
  192.                 for i in xrange(self.check_loop):
  193.                     if verbose:
  194.                         print "main loop on", socket, connection
  195.                     # checkpoint loop
  196.                     sockets = [socket]
  197.                     if pending_connects:
  198.                         sockets = sockets + pending_connects.keys()
  199.                     # wait for availability
  200.                     if verbose:
  201.                         print "server: waiting for connection(s)"
  202.                     (readables, dummy, errors) = select(\
  203.                        sockets, [], sockets[:], self.select_timeout)
  204.                     if socket in errors:
  205.                         raise ServerError, \
  206.                           "listening socket in error state: aborting"
  207.                     # clean up error connection sockets
  208.                     for s in errors:
  209.                         del pending_connects[s]
  210.                         s.close()
  211.                     # get a new connection, if available
  212.                     if socket in readables:
  213.                         readables.remove(socket)
  214.                         (conn, addr) = socket.accept()
  215.                         if 1 or verbose:
  216.                             print "connect %s" % (addr,)
  217.                         reader = Packet_Reader(conn)
  218.                         pending_connects[conn] = reader
  219.                     # poll readable pending connections, if possible
  220.                     for conn in readables:
  221.                         reader = pending_connects[conn]
  222.                         mode = reader.mode
  223.                         if not mode==READY:
  224.                             if mode == ERROR:
  225.                                 # shouldn't happen
  226.                                 try:
  227.                                     conn.close()
  228.                                     del pending_connects[conn]
  229.                                 except: pass
  230.                                 continue
  231.                             else:
  232.                                 try:
  233.                                    reader.poll()
  234.                                 finally:
  235.                                    pass # AFTER DEBUG CHANGE THIS!
  236.                     # in blocking mode, service ready request, 
  237.                     # commit on no error
  238.                     for conn in pending_connects.keys():
  239.                         reader = pending_connects[conn]
  240.                         mode = reader.mode
  241.                         if mode == ERROR:
  242.                             try:
  243.                                 del pending_connects[conn]
  244.                                 conn.close()
  245.                             except: pass
  246.                         elif mode == READY:
  247.                             try:
  248.                                 del pending_connects[conn]
  249.                                 data = reader.data
  250.                                 (actor_name, cert, md) = \
  251.                                   unpack_certified_data(data)
  252.                                 # find the policy for this actor
  253.                                 if not policies.has_key(actor_name):
  254.                                     if verbose:
  255.                                         print "no such policy: "+actor_name
  256.                                     reply_exception(NameError, 
  257.                                      "no such policy: "+actor_name, conn)
  258.                                     policy = None
  259.                                 else:
  260.                                     if verbose:
  261.                                         print "executing for", actor_name
  262.                                     policy = policies[actor_name]
  263.                                     policy.action(cert, md, conn)
  264.                             except SHUTDOWN:
  265.                                 if policy is admin_policy:
  266.                                     print \
  267.   "shutdown on admin policy: terminating"
  268.                                     connection.close()
  269.                                     socket.close()
  270.                                     # NORMAL TERMINATION:
  271.                                     return
  272.                             except RESTART:
  273.                                 if policy is admin_policy:
  274.                                     print \
  275.   "restart from admin policy: restarting connection"
  276.                                     connection.restart()
  277.                             except CHECKPOINT:
  278.                                 if policy is admin_policy:
  279.                                     print \
  280.   "checkpoint from admin policy: checkpointing now."
  281.                                     connection.checkpoint()
  282.                             except:
  283.                                 tb = sys.exc_traceback
  284.                                 info = "%s %s" % (sys.exc_type,
  285.                                              str(sys.exc_value))
  286.                                 if verbose:
  287.                                     from traceback import print_tb
  288.                                     print_tb(tb)
  289.                                 print "error in executing action: "+info
  290.                                 reply_exception(
  291.   ServerError, "exception: "+info, conn)
  292.                         #break # stop after first request serviced!
  293.             except:
  294.                 # except of main while 1 try statement
  295.                 tb = sys.exc_traceback
  296.                 ty = sys.exc_type
  297.                 va = sys.exc_value
  298.                 print "UNEXPECTED EXCEPTION ON MAINLOOP"
  299.                 from traceback import print_tb
  300.                 print_tb(tb)
  301.                 print "exception:", ty, va
  302.             if not pending_connects:
  303.                 pending_connects = {}
  304.             print "server: checkpointing"
  305.             connection.checkpoint()
  306.  
  307.     def init(self):
  308.         self.getconnection()
  309.         self.startup_load()
  310.         # get socket last in case of failure earlier
  311.         self.getsocket()
  312.         
  313.  
  314.     HOST = ""
  315.     BACKLOG = 5
  316.     
  317.     def getsocket(self):
  318.         """get the listening socket"""
  319.         verbose = self.verbose
  320.         import socket, sys
  321.         if verbose:
  322.             print "initializing listener socket"
  323.         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  324.         try:
  325.             if verbose:
  326.                 print "trying to set REUSEADDR",\
  327.                        sock.getsockopt(socket.SOL_SOCKET,
  328.                           socket.SO_REUSEADDR)
  329.             sock.setsockopt(socket.SOL_SOCKET, 
  330.                    socket.SO_REUSEADDR, 1)
  331.         except: 
  332.             if verbose: 
  333.                print "set of REUSEADDR failed", sys.exc_type, sys.exc_value
  334.             pass
  335.         sock.bind((self.HOST, self.port))
  336.         sock.listen(self.BACKLOG)
  337.         self.socket = sock
  338.         return sock
  339.         
  340.     def getconnection(self):
  341.         """get the db connection"""
  342.         from gadfly import gadfly
  343.         c = self.connection = gadfly(self.db, self.dr)
  344.         # don't automatically checkpoint upon commit
  345.         c.autocheckpoint = 0
  346.  
  347.     def startup_load(self):
  348.         """setup the policies and load startup module"""
  349.         admin_policy = self.get_admin_policy()
  350.         module_name = self.startup
  351.         if module_name:
  352.             module = __import__(module_name)
  353.             # startup(admin_policy, connection, Server_instance)
  354.             test = module.startup(admin_policy, self.connection, self)
  355.             if test is not None:
  356.                 self.policies = test
  357.         self.policies["admin"] = admin_policy
  358.         
  359.     def get_admin_policy(self):
  360.         """return the admin policy for priviledged access."""
  361.         p = self.admin_policy = Policy(
  362.              "admin", self.pw, self.connection, queries=1)
  363.         return p
  364.  
  365. class Policy:
  366.     """security policy"""
  367.  
  368.     verbose = 0
  369.     
  370.     # allow arbitrary sql statments
  371.     general_queries = 0
  372.     
  373.     # dictionary of named accesses as strings
  374.     named_accesses = None
  375.     
  376.     # dictionary of prepared named accesses
  377.     prepared_cursors = None
  378.     
  379.     def __init__(self, name, password, connection, queries=0):
  380.         """create a policy (name, password, connection)
  381.         
  382.            name is the name of the policy
  383.            password is the access policy (None for no password)
  384.            connection is the database connection.
  385.            set queries to allow general accesses (unrestricted)
  386.         """
  387.         if self.verbose:  
  388.             print "policy.__init__", name
  389.         self.general_queries = queries
  390.         self.name = name
  391.         self.password = password
  392.         self.connection = connection
  393.         self.socket = None
  394.         self.named_accesses = {}
  395.         self.prepared_cursors = {}
  396.         
  397.     def __setitem__(self, name, value):
  398.         if self.verbose:
  399.             print "policy", self.name, ":", (name, value)
  400.         from types import StringType
  401.         if type(name) is not StringType or type(value) is not StringType:
  402.            raise ValueError, "cursor names and contents must be strings"
  403.         self.named_accesses[name] = value
  404.         
  405.     def execute_named(self, name, params=None):
  406.         """execute a named (prepared) sql statement"""
  407.         if self.verbose:
  408.             print "policy", self.name, "executes", name, params
  409.         na = self.named_accesses
  410.         pc = self.prepared_cursors
  411.         con = self.connection
  412.         if not na.has_key(name):
  413.             raise PreparedNameError, "unknown access name: %s" % name
  414.         stat = na[name]
  415.         if pc.has_key(name):
  416.             # get prepared query
  417.             cursor = pc[name]
  418.         else:
  419.             # prepare a new cursor
  420.             pc[name] = cursor = con.cursor()
  421.         return self.execute(cursor, stat, params)
  422.             
  423.     def execute(self, cursor, statement, params=None):
  424.         """execute a statement in a cursor"""
  425.         if self.verbose:
  426.             print "policy", self.name, "executes", statement, params
  427.         cursor.execute(statement, params)
  428.         # immediate commit!
  429.         self.connection.commit()
  430.         try:
  431.             result = cursor.fetchall()
  432.             description = cursor.description
  433.             result = (description, result)
  434.         except:
  435.             result = None
  436.         return result
  437.         
  438.     def execute_any_statement(self, statement, params=None):
  439.         """execute any statement."""
  440.         if self.verbose:
  441.             print "policy", self.name, "executes", statement, params
  442.         con = self.connection
  443.         cursor = con.cursor()
  444.         return self.execute(cursor, statement, params)
  445.         
  446.     def action(self, certificate, datastring, socket):
  447.         """perform a database/server action after checking certificate"""
  448.         verbose = self.verbose
  449.         if verbose:
  450.             print "policy", self.name, "action..."
  451.         # make sure the certificate checks out
  452.         if not self.certify(datastring, certificate, self.password):
  453.             raise ServerError, "password certification failure"
  454.         # unpack the datastring
  455.         from marshal import loads
  456.         test = loads(datastring)
  457.         #if verbose:
  458.             #print "data is", test
  459.         (action, moredata) = test
  460.         import sys
  461.         if action in ACTIONS:
  462.             action = "policy_"+action
  463.             myaction = getattr(self, action)
  464.             try:
  465.                 data = apply(myaction, moredata+(socket,))
  466.                 #self.reply_success(data)
  467.             # pass up server level requests as exceptions
  468.             except SHUTDOWN, detail:
  469.                 raise SHUTDOWN, detail
  470.             except RESTART, detail:
  471.                 raise RESTART, detail
  472.             except CHECKPOINT, detail:
  473.                 raise CHECKPOINT, detail
  474.             except:
  475.                 tb = sys.exc_traceback
  476.                 exceptiondata = "%s\n%s" %(sys.exc_type,
  477.                     str(sys.exc_value))
  478.                 if verbose:
  479.                    from traceback import print_tb
  480.                    print_tb(tb)
  481.                 self.reply_exception(ServerError, 
  482.                   "unexpected exception: "+exceptiondata, socket)
  483.                 raise ServerError, exceptiondata
  484.         else:
  485.             raise ServerError, "unknown action: "+`action`
  486.             
  487.     def certify(self, datastring, certificate, password):
  488.         # hook for subclassing
  489.         return certify(datastring, certificate, password)
  490.                 
  491.     def policy_SHUTDOWN(self, socket):
  492.         self.reply_success("attempting server shutdown", socket)
  493.         raise SHUTDOWN, "please shut down the server"
  494.     
  495.     def policy_RESTART(self, socket):
  496.         self.reply_success("attempting server restart", socket)
  497.         raise RESTART, "please restart the server"
  498.         
  499.     def policy_CHECKPOINT(self, socket):
  500.         self.reply_success("attempting server checkpoint", socket)
  501.         raise CHECKPOINT, "please checkpoint the server"
  502.         
  503.     def policy_EXECUTE_PREPARED(self, name, dyn, socket):
  504.         try:
  505.             result = self.execute_named(name, dyn)
  506.             self.reply_success(result, socket)
  507.         except PreparedNameError, detail:
  508.             self.reply_exception(PreparedNameError, 
  509.              "no such prepared statement: "+name,
  510.              socket)
  511.     
  512.     def policy_EXECUTE_STATEMENT(self, stat, dyn, socket):
  513.         if not self.general_queries:
  514.             self.reply_exception(ServerError, 
  515.                "general statements disallowed on this policy",
  516.                socket)
  517.             raise ServerError, "illegal statement attempt for: "+self.name
  518.         result = self.execute_any_statement(stat, dyn)
  519.         self.reply_success(result, socket)
  520.         
  521.     def reply_exception(self, exc, info, socket):
  522.         # hook for subclassing
  523.         reply_exception(exc, info, socket)
  524.         
  525.     def reply_success(self, data, socket):
  526.         # hook for subclassing
  527.         reply_success(data, socket)
  528.         
  529. if __name__=="__main__": main()
  530.